package consul

import (
	"errors"
	"fmt"
	"gitee.com/jiewen/go-toolkit/conf"
	"github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/api/watch"
	"github.com/satori/go.uuid"
	"math/rand"
	"net"
	"os"
	"strconv"
	"sync"
)

type Consul interface {
	Service
	KV
}

type Service interface {
	RegisterApiService() error
	DeRegisterApiService() error
	GetService(name string) []string
	BalancedGet(name string) string
	Watch(name string) error
	Close()
}

type consul struct {
	conf        *conf.Conf
	agentAddr   string
	serverIp    string
	serverPort  int
	serviceId   string
	serviceName string
	client      *api.Client
	services    map[string][]string
	sync.RWMutex
	wps     []*watch.Plan
	wpsLock sync.RWMutex
}

func New(conf *conf.Conf) (Consul, error) {
	c := &consul{
		services: make(map[string][]string),
	}
	c.agentAddr = c.setAgentAddr(conf)
	c.serverIp = c.setServerIP(conf)
	c.serverPort = c.setServerPort(conf)
	c.serviceId = c.setServiceId()
	c.serviceName = c.setServiceName(conf)

	//监听service
	if err := c.Watch(c.serviceName); err != nil {
		return nil, err
	}
	return c, nil
}

//注册服务
func (c *consul) RegisterApiService() error {
	if c.serverPort == 0 {
		return errors.New("server port empty")
	}

	if c.client == nil {
		client, err := c.newClient()

		if err != nil {
			return err
		}

		c.client = client
	}

	return c.client.Agent().ServiceRegister(&api.AgentServiceRegistration{
		ID:      c.serviceId,
		Name:    c.serviceName,
		Address: c.serverIp,
		Port:    c.serverPort,
		Check: &api.AgentServiceCheck{
			Interval:                       "10s",
			HTTP:                           fmt.Sprintf("http://%s:%d/health", c.serverIp, c.serverPort),
			Timeout:                        "30s",
			DeregisterCriticalServiceAfter: "3m",
		},
		Tags: []string{"导入单位组织"},
	})
}

//注销服务
func (c *consul) DeRegisterApiService() error {
	if c.client == nil {
		client, err := c.newClient()

		if err != nil {
			return err
		}

		c.client = client
	}

	return c.client.Agent().ServiceDeregister(c.serviceId)
}

// 获取所有服务
func (c *consul) GetService(name string) []string {
	c.RLock()
	defer c.RUnlock()

	addresses, exist := c.services[name]
	if !exist || len(addresses) == 0 {
		return nil
	}

	return addresses
}

// 获取
func (c *consul) BalancedGet(name string) string {
	c.RLock()
	defer c.RUnlock()

	addresses, exist := c.services[name]
	if !exist || len(addresses) == 0 {
		return ""
	}

	return addresses[rand.Intn(len(addresses))]
}

//获取client
func (c *consul) newClient() (*api.Client, error) {
	return api.NewClient(&api.Config{
		Address: c.agentAddr,
	})
}

func (c *consul) Watch(name string) error {
	var (
		doneCh = make(chan struct{})
		err    error
	)

	go func() {
		err = c.watch(name, doneCh)
	}()

	<-doneCh
	return err
}

func (c *consul) watch(name string, doneCh chan struct{}) error {
	params := map[string]interface{}{
		"type":        "service",
		"service":     name,
		"datacenter":  "",
		"passingonly": true,
	}
	wp, err := watch.Parse(params)
	if err != nil {
		return err
	}

	var once sync.Once

	wp.Handler = func(idx uint64, obj interface{}) {
		ses, ok := obj.([]*api.ServiceEntry)
		if !ok {
			return
		}

		c.onUpdate(name, ses)
		once.Do(func() { doneCh <- struct{}{} })
	}

	c.wpsLock.Lock()
	c.wps = append(c.wps, wp)
	c.wpsLock.Unlock()

	return wp.Run(c.agentAddr)
}

func (c *consul) onUpdate(name string, ses []*api.ServiceEntry) {
	var (
		addresses = make([]string, 0)
		dup       = make(map[string]struct{}, len(ses)) //去重用
	)

	for _, se := range ses {
		addr := net.JoinHostPort(se.Service.Address, fmt.Sprint(se.Service.Port))
		if _, exist := dup[addr]; exist {
			continue
		}

		addresses = append(addresses, addr)
		dup[addr] = struct{}{}
	}

	c.Lock()
	defer c.Unlock()

	c.services[name] = addresses
}

func (c *consul) setAgentAddr(conf *conf.Conf) string {
	var agentAddr = os.Getenv("AGENT_ADDR")

	if agentAddr == "" {
		agentAddr = conf.GetString("consul.addr")
	}

	return agentAddr
}

func (c *consul) setServerIP(conf *conf.Conf) string {
	var ip = os.Getenv("SERVER_IP")

	if ip == "" {
		ip = conf.GetString("consul.server_ip")
	}

	return ip
}

func (c *consul) setServerPort(conf *conf.Conf) int {
	var p = os.Getenv("SERVER_PORT")

	port, _ := strconv.Atoi(p)

	if port == 0 {
		port = conf.GetInt("consul.server_port")
	}

	return port
}

func (c *consul) setServiceId() string {
	return uuid.NewV4().String()
}

func (c *consul) setServiceName(conf *conf.Conf) string {
	var name = os.Getenv("SERVER_NAME")

	if name == "" {
		name = conf.GetString("consul.server_name")
	}

	return name
}

func (c *consul) Close() {
	c.wpsLock.RLock()
	defer c.wpsLock.RUnlock()

	for _, wp := range c.wps {
		wp.Stop()
	}
}
